{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 编写自定义图分析算法"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Install graphscope package is you are NOT in the Playground\n",
    "\n",
    "!pip3 install graphscope\n",
    "GraphScope 的图分析引擎继承了 [GRAPE](https://dl.acm.org/doi/10.1145/3282488) ， 该系统于 SIGMOD2017 上首次提出并获得最佳论文奖。\n",
    "\n",
    "与以往的系统的不同，GRAPE 支持将串行图算法自动并行化。在 GRAPE 中， 只需进行很小的更改即可轻松地将串行算法即插即用，使其并行化的运行在分布式环境，并高效地处理大规模图数据。 除了易于编程外，GRAPE 还被设计为高效且可拓展的系统，可灵活应对现实中图应用多变的规模、多样性和复杂性。\n",
    "\n",
    "在这个教程中，我们将展示如何进行自定义基于 PIE 模型或者 Pregel 模型的图分析算法。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Install graphscope package if you are NOT in the Playground\n",
    "\n",
    "!pip3 install graphscope"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 自定义基于 PIE 模型的图算法"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "GraphScope 支持用户使用纯 Python 语言，以 [PIE](https://dl.acm.org/doi/10.1145/3282488) 编程模型写图算法。首先我们需要导入 GraphScope 的包和 PIE 装饰器。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Import the graphscope module.\n",
    "\n",
    "import graphscope\n",
    "from graphscope.framework.app import AppAssets\n",
    "from graphscope.analytical.udf.decorators import pie\n",
    "\n",
    "\n",
    "graphscope.set_option(show_log=False)  # enable logging"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "以单源最短路径([SSSP](https://en.wikipedia.org/wiki/Shortest_path_problem))为例，编写 PIE 算法需要填写以下几个函数。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@pie(vd_type=\"double\", md_type=\"double\")\n",
    "class SSSP_PIE(AppAssets):\n",
    "    @staticmethod\n",
    "    def Init(frag, context):\n",
    "        pass\n",
    "\n",
    "    @staticmethod\n",
    "    def PEval(frag, context):\n",
    "        pass\n",
    "\n",
    "    @staticmethod\n",
    "    def IncEval(frag, context):\n",
    "        pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "装饰器 **pie** 包含两个参数 `vd_type` 和 `md_type`, 他们分别代表节点上的数据类型和消息的数据类型。\n",
    "\n",
    "您可以为您的算法指定这两个数据类型，可用的数据类型包括 `int`，`double` 和 `string`。\n",
    "在我们这个示例中，由于 SSSP 计算的距离和发送的消息更新均是 `double` 类型，所以我们为这两个类型都指定为 `double`。\n",
    "\n",
    "在函数 `Init`，`PEval` 和 `IncEval`中，都有 **frag** 和 **context** 这两个参数。它们分别用于在算法逻辑中访问图数据和中间结果数据。可以查看文档 [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) 了解更多。\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 定义 Init 函数"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@pie(vd_type=\"double\", md_type=\"double\")\n",
    "class SSSP_PIE(AppAssets):\n",
    "    @staticmethod\n",
    "    def Init(frag, context):\n",
    "        v_label_num = frag.vertex_label_num()\n",
    "        for v_label_id in range(v_label_num):\n",
    "            nodes = frag.nodes(v_label_id)\n",
    "            context.init_value(\n",
    "                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate\n",
    "            )\n",
    "            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)\n",
    "\n",
    "    @staticmethod\n",
    "    def PEval(frag, context):\n",
    "        pass\n",
    "\n",
    "    @staticmethod\n",
    "    def IncEval(frag, context):\n",
    "        pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`Init` 函数有以下几个职责：1）为每个节点设置初始值； 2）定义消息传递的策略； 3）指定聚合器，以在每一轮中处理收到的消息。\n",
    "  \n",
    "请注意，您定义的算法将在属性图上运行。所以我们应该首先通过 `v_label_num = frag.vertex_label_num()` 获得顶点标签，然后可以遍历所有具有相同标签的节点，\n",
    "并通过 `nodes = frag.nodes(v_label_id)` 和 `context.init_value(nodes, v_label_id, 1000000000.0，PIEAggregateType.kMinAggregate)` 设置初始值。\n",
    "\n",
    "由于我们正在计算源节点和其他节点之间的最短路径，在这里我们将 `PIEAggregateType.kMinAggregate` 用作消息聚合的聚合器。这意味着它将对所有收到的消息执行去 Min 操作。其他可用的聚合器包括 `kMaxAggregate`，`kSumAggregate`，`kProductAggregate` 和 `kOverwriteAggregate`。\n",
    "\n",
    "在 `Init` 函数的最后，我们向节点注册 `MessageStrategy.kSyncOnOuterVertex` 指定如何传递消息。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 定义 PEval 函数"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@pie(vd_type=\"double\", md_type=\"double\")\n",
    "class SSSP_PIE(AppAssets):\n",
    "    @staticmethod\n",
    "    def Init(frag, context):\n",
    "        v_label_num = frag.vertex_label_num()\n",
    "        for v_label_id in range(v_label_num):\n",
    "            nodes = frag.nodes(v_label_id)\n",
    "            context.init_value(\n",
    "                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate\n",
    "            )\n",
    "            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)\n",
    "\n",
    "    @staticmethod\n",
    "    def PEval(frag, context):\n",
    "        src = int(context.get_config(b\"src\"))\n",
    "        graphscope.declare(graphscope.Vertex, source)\n",
    "        native_source = False\n",
    "        v_label_num = frag.vertex_label_num()\n",
    "        for v_label_id in range(v_label_num):\n",
    "            if frag.get_inner_node(v_label_id, src, source):\n",
    "                native_source = True\n",
    "                break\n",
    "        if native_source:\n",
    "            context.set_node_value(source, 0)\n",
    "        else:\n",
    "            return\n",
    "        e_label_num = frag.edge_label_num()\n",
    "        for e_label_id in range(e_label_num):\n",
    "            edges = frag.get_outgoing_edges(source, e_label_id)\n",
    "            for e in edges:\n",
    "                dst = e.neighbor()\n",
    "                distv = e.get_int(2)\n",
    "                if context.get_node_value(dst) > distv:\n",
    "                    context.set_node_value(dst, distv)\n",
    "\n",
    "    @staticmethod\n",
    "    def IncEval(frag, context):\n",
    "        pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在 **SSSP** 的 `PEval` 函数中, 通过函数 `context.get_config(b\"src\")` 获取用户查询的 src 节点。\n",
    "\n",
    "`PEval` 会在每个分区上调用 `frag.get_inner_node(v_label_id, src, source)` 的方法查询 src 节点是否在本分区。请注意这里函数 `get_inner_node` method needs a `source` 需要一个类型为 `Vertex` 的参数, 我们通过 `graphscope.declare(graphscope.Vertex, source)` 来定义声明。\n",
    " \n",
    "如果一个分区包含查询的源节点，它将调用 `frag.get_outgoing_edges（source，e_label_id）` 遍历源节点的所有输出边。 对于每个顶点，它计算到源的距离。如果该值小于初始值，则更新该值。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 定义 IncEval 函数\n",
    "\n",
    "**SSSP** 算法的 `IncEval` 和 `PEval` 之间唯一区别是 `IncEval` 将会在每个分区上都被调用。计算完 `IncEval` 后，将会产生一些消息，这些消息将被发送到其他分区供下一轮`IncEval` 调用。一直到所有分区都不再产生消息，此时算法结束。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@pie(vd_type=\"double\", md_type=\"double\")\n",
    "class SSSP_PIE(AppAssets):\n",
    "    @staticmethod\n",
    "    def Init(frag, context):\n",
    "        v_label_num = frag.vertex_label_num()\n",
    "        for v_label_id in range(v_label_num):\n",
    "            nodes = frag.nodes(v_label_id)\n",
    "            context.init_value(\n",
    "                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate\n",
    "            )\n",
    "            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)\n",
    "\n",
    "    @staticmethod\n",
    "    def PEval(frag, context):\n",
    "        src = int(context.get_config(b\"src\"))\n",
    "        graphscope.declare(graphscope.Vertex, source)\n",
    "        native_source = False\n",
    "        v_label_num = frag.vertex_label_num()\n",
    "        for v_label_id in range(v_label_num):\n",
    "            if frag.get_inner_node(v_label_id, src, source):\n",
    "                native_source = True\n",
    "                break\n",
    "        if native_source:\n",
    "            context.set_node_value(source, 0)\n",
    "        else:\n",
    "            return\n",
    "        e_label_num = frag.edge_label_num()\n",
    "        for e_label_id in range(e_label_num):\n",
    "            edges = frag.get_outgoing_edges(source, e_label_id)\n",
    "            for e in edges:\n",
    "                dst = e.neighbor()\n",
    "                distv = e.get_int(2)\n",
    "                if context.get_node_value(dst) > distv:\n",
    "                    context.set_node_value(dst, distv)\n",
    "\n",
    "    @staticmethod\n",
    "    def IncEval(frag, context):\n",
    "        v_label_num = frag.vertex_label_num()\n",
    "        e_label_num = frag.edge_label_num()\n",
    "        for v_label_id in range(v_label_num):\n",
    "            iv = frag.inner_nodes(v_label_id)\n",
    "            for v in iv:\n",
    "                v_dist = context.get_node_value(v)\n",
    "                for e_label_id in range(e_label_num):\n",
    "                    es = frag.get_outgoing_edges(v, e_label_id)\n",
    "                    for e in es:\n",
    "                        u = e.neighbor()\n",
    "                        u_dist = v_dist + e.get_int(2)\n",
    "                        if context.get_node_value(u) > u_dist:\n",
    "                            context.set_node_value(u, u_dist)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 在图上调用您的算法"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load p2p network dataset\n",
    "\n",
    "from graphscope.dataset import load_p2p_network\n",
    "\n",
    "graph = load_p2p_network(directed=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "然后，初始化刚才定义的算法，并查询起始点为 `6` 的最短路径。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sssp = SSSP_PIE()\n",
    "ctx = sssp(graph, src=6)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "执行这个 Cell，您的算法应该被成功执行。\n",
    "此时，结果存在于分布式的 pod 内存中，由 vineyard 在进行管理。我们可以通过以下方法将远程数据取过来并展示。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "r1 = (\n",
    "    ctx.to_dataframe({\"node\": \"v:host.id\", \"r\": \"r:host\"})\n",
    "    .sort_values(by=[\"node\"])\n",
    "    .to_numpy(dtype=float)\n",
    ")\n",
    "r1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 保存和载入您的算法\n",
    "\n",
    "您可以保存您的自定义算法供之后使用。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "# specify the path you want to dump\n",
    "dump_path = os.path.expanduser(\"~/sssp_pie.gar\")\n",
    "\n",
    "# dump\n",
    "SSSP_PIE.to_gar(dump_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "现在 您可以在目录 `~/` 下看到您的算法包 `sssp_pie.gar`。下次再次使用时，只需要这样载入："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from graphscope.framework.app import load_app\n",
    "\n",
    "# specify the path you want to dump\n",
    "dump_path = os.path.expanduser(\"~/sssp_pie.gar\")\n",
    "\n",
    "sssp2 = load_app(dump_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 基于 Pregel 模型编写算法\n",
    "\n",
    "除了上面的 PIE 模型，您也可以通过基于点中心编程的 Pregel 模型编写自己的算法。首先我们需要导入 GraphScope 包和 **pregel** 装饰器。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import graphscope\n",
    "\n",
    "from graphscope.framework.app import AppAssets\n",
    "from graphscope.analytical.udf.decorators import pregel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@pregel(vd_type=\"double\", md_type=\"double\")\n",
    "class SSSP_Pregel(AppAssets):\n",
    "    @staticmethod\n",
    "    def Init(v, context):\n",
    "        pass\n",
    "\n",
    "    @staticmethod\n",
    "    def Compute(messages, v, context):\n",
    "        pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**pregel** 装饰器也有两个类型参数 `vd_type` 和 `md_type`，分别用于指定点上的数据类型和消息的类型，支持的类型包括 `int`，`double` 和 `string`。\n",
    "在我们的 **SSSP** 例子中，我们将这两个值都设置为 `double`。\n",
    "\n",
    "由于 Pregel 模型的算法逻辑是定义在点上的，它的 `Init` 和 `Compute` 函数有一个参数 `v` 用于访问点上的数据。查看文档 [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) 了解更多。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 定义 Init 函数"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@pregel(vd_type=\"double\", md_type=\"double\")\n",
    "class SSSP_Pregel(AppAssets):\n",
    "    @staticmethod\n",
    "    def Init(v, context):\n",
    "        v.set_value(1000000000.0)\n",
    "\n",
    "    @staticmethod\n",
    "    def Compute(messages, v, context):\n",
    "        pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`Init` 函数将每个点上的初始路径设置为极大值。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 定义 Compute 函数\n",
    "\n",
    "**SSSP** 的 `Compute` 函数通过以下步骤为每个节点计算新距离：\n",
    "\n",
    "1）首先初始化值为1000000000的新值    \n",
    "2）如果顶点是源节点，则将其距离设置为0    \n",
    "3）计算接收到的消息的 Min 值，如果该值小于当前值，则设置该值。  \n",
    "\n",
    "重复这些步骤，直到不再生成新消息（距离更短）为止。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@pregel(vd_type=\"double\", md_type=\"double\")\n",
    "class SSSP_Pregel(AppAssets):\n",
    "    @staticmethod\n",
    "    def Init(v, context):\n",
    "        v.set_value(1000000000.0)\n",
    "\n",
    "    @staticmethod\n",
    "    def Compute(messages, v, context):\n",
    "        src_id = context.get_config(b\"src\")\n",
    "        cur_dist = v.value()\n",
    "        new_dist = 1000000000.0\n",
    "        if v.id() == src_id:\n",
    "            new_dist = 0\n",
    "        for message in messages:\n",
    "            new_dist = min(message, new_dist)\n",
    "        if new_dist < cur_dist:\n",
    "            v.set_value(new_dist)\n",
    "            for e_label_id in range(context.edge_label_num()):\n",
    "                edges = v.outgoing_edges(e_label_id)\n",
    "                for e in edges:\n",
    "                    v.send(e.vertex(), new_dist + e.get_int(2))\n",
    "        v.vote_to_halt()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 可选的 Combiner 函数\n",
    "\n",
    "我们可以定义一个 Combiner 以减少消息通信的开销，注意这个 Combiner 函数是可选的，您也可以不实现。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@pregel(vd_type=\"double\", md_type=\"double\")\n",
    "class SSSP_Pregel(AppAssets):\n",
    "    @staticmethod\n",
    "    def Init(v, context):\n",
    "        v.set_value(1000000000.0)\n",
    "\n",
    "    @staticmethod\n",
    "    def Compute(messages, v, context):\n",
    "        src_id = context.get_config(b\"src\")\n",
    "        cur_dist = v.value()\n",
    "        new_dist = 1000000000.0\n",
    "        if v.id() == src_id:\n",
    "            new_dist = 0\n",
    "        for message in messages:\n",
    "            new_dist = min(message, new_dist)\n",
    "        if new_dist < cur_dist:\n",
    "            v.set_value(new_dist)\n",
    "            for e_label_id in range(context.edge_label_num()):\n",
    "                edges = v.outgoing_edges(e_label_id)\n",
    "                for e in edges:\n",
    "                    v.send(e.vertex(), new_dist + e.get_int(2))\n",
    "        v.vote_to_halt()\n",
    "\n",
    "    @staticmethod\n",
    "    def Combine(messages):\n",
    "        ret = 1000000000.0\n",
    "        for m in messages:\n",
    "            ret = min(ret, m)\n",
    "        return ret"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 运行您的 Pregel 算法\n",
    "\n",
    "接下来让我们运行基于 Pregel 写的 SSSP 算法和查看结果。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sssp_pregel = SSSP_Pregel()\n",
    "ctx = sssp_pregel(graph, src=6)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "r2 = (\n",
    "    ctx.to_dataframe({\"node\": \"v:host.id\", \"r\": \"r:host\"})\n",
    "    .sort_values(by=[\"node\"])\n",
    "    .to_numpy(dtype=float)\n",
    ")\n",
    "r2"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Pregel 模型中的 Aggregator"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Pregel 模型中的 aggregator 聚合器是一种用于全局通信、监视和计数的机制。 \n",
    "\n",
    "每个顶点都可以在超步 `S` 中为聚合器提供值，系统将这些值聚合在一起，使用归约算符对这些值进行计算，并在超步 `S+1` 中将所得值提供给所有顶点。\n",
    "GraphScope 为 Pregel 算法提供了许多预定义的聚合器，例如对数据类型的 `min`，`max` 或 `sum` 等。\n",
    "\n",
    "这里是使用内置 aggregator 的示例，您也可以在文档 [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) 中了解更多"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@pregel(vd_type=\"double\", md_type=\"double\")\n",
    "class Aggregators_Pregel_Test(AppAssets):\n",
    "    @staticmethod\n",
    "    def Init(v, context):\n",
    "        # int\n",
    "        context.register_aggregator(\n",
    "            b\"int_sum_aggregator\", PregelAggregatorType.kInt64SumAggregator\n",
    "        )\n",
    "        context.register_aggregator(\n",
    "            b\"int_max_aggregator\", PregelAggregatorType.kInt64MaxAggregator\n",
    "        )\n",
    "        context.register_aggregator(\n",
    "            b\"int_min_aggregator\", PregelAggregatorType.kInt64MinAggregator\n",
    "        )\n",
    "        # double\n",
    "        context.register_aggregator(\n",
    "            b\"double_product_aggregator\", PregelAggregatorType.kDoubleProductAggregator\n",
    "        )\n",
    "        context.register_aggregator(\n",
    "            b\"double_overwrite_aggregator\",\n",
    "            PregelAggregatorType.kDoubleOverwriteAggregator,\n",
    "        )\n",
    "        # bool\n",
    "        context.register_aggregator(\n",
    "            b\"bool_and_aggregator\", PregelAggregatorType.kBoolAndAggregator\n",
    "        )\n",
    "        context.register_aggregator(\n",
    "            b\"bool_or_aggregator\", PregelAggregatorType.kBoolOrAggregator\n",
    "        )\n",
    "        context.register_aggregator(\n",
    "            b\"bool_overwrite_aggregator\", PregelAggregatorType.kBoolOverwriteAggregator\n",
    "        )\n",
    "        # text\n",
    "        context.register_aggregator(\n",
    "            b\"text_append_aggregator\", PregelAggregatorType.kTextAppendAggregator\n",
    "        )\n",
    "\n",
    "    @staticmethod\n",
    "    def Compute(messages, v, context):\n",
    "        if context.superstep() == 0:\n",
    "            context.aggregate(b\"int_sum_aggregator\", 1)\n",
    "            context.aggregate(b\"int_max_aggregator\", int(v.id()))\n",
    "            context.aggregate(b\"int_min_aggregator\", int(v.id()))\n",
    "            context.aggregate(b\"double_product_aggregator\", 1.0)\n",
    "            context.aggregate(b\"double_overwrite_aggregator\", 1.0)\n",
    "            context.aggregate(b\"bool_and_aggregator\", True)\n",
    "            context.aggregate(b\"bool_or_aggregator\", False)\n",
    "            context.aggregate(b\"bool_overwrite_aggregator\", True)\n",
    "            context.aggregate(b\"text_append_aggregator\", v.id() + b\",\")\n",
    "        else:\n",
    "            if v.id() == b\"1\":\n",
    "                assert context.get_aggregated_value(b\"int_sum_aggregator\") == 62586\n",
    "                assert context.get_aggregated_value(b\"int_max_aggregator\") == 62586\n",
    "                assert context.get_aggregated_value(b\"int_min_aggregator\") == 1\n",
    "                assert context.get_aggregated_value(b\"double_product_aggregator\") == 1.0\n",
    "                assert (\n",
    "                    context.get_aggregated_value(b\"double_overwrite_aggregator\") == 1.0\n",
    "                )\n",
    "                assert context.get_aggregated_value(b\"bool_and_aggregator\") == True\n",
    "                assert context.get_aggregated_value(b\"bool_or_aggregator\") == False\n",
    "                assert (\n",
    "                    context.get_aggregated_value(b\"bool_overwrite_aggregator\") == True\n",
    "                )\n",
    "                context.get_aggregated_value(b\"text_append_aggregator\")\n",
    "            v.vote_to_halt()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
